{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using Spark SQL with Python on CSV"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>dispatching_base_number</th>\n",
       "      <th>date</th>\n",
       "      <th>active_vehicles</th>\n",
       "      <th>trips</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>B02512</td>\n",
       "      <td>1/1/2015</td>\n",
       "      <td>190</td>\n",
       "      <td>1132</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>B02765</td>\n",
       "      <td>1/1/2015</td>\n",
       "      <td>225</td>\n",
       "      <td>1765</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>B02764</td>\n",
       "      <td>1/1/2015</td>\n",
       "      <td>3427</td>\n",
       "      <td>29421</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>B02682</td>\n",
       "      <td>1/1/2015</td>\n",
       "      <td>945</td>\n",
       "      <td>7679</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>B02617</td>\n",
       "      <td>1/1/2015</td>\n",
       "      <td>1228</td>\n",
       "      <td>9537</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "  dispatching_base_number      date  active_vehicles  trips\n",
       "0                  B02512  1/1/2015              190   1132\n",
       "1                  B02765  1/1/2015              225   1765\n",
       "2                  B02764  1/1/2015             3427  29421\n",
       "3                  B02682  1/1/2015              945   7679\n",
       "4                  B02617  1/1/2015             1228   9537"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Quick view of the structure of the incoming csv file\n",
    "import pandas as pd\n",
    "pandf = pd.read_csv(\"Uber-Jan-Feb-FOIL.csv\", header=0)\n",
    "pandf.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "pandf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Call read function in the sqlcontext to read the incoming file \n",
    "# as `CSV` format using the provided package as the options..\n",
    "\n",
    "df = sqlContext.read.format(\"com.databricks.spark.csv\") \\\n",
    "    .options(header='true', inferschema='true').load(\"Uber-Jan-Feb-FOIL.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(dispatching_base_number=u'B02512', date=u'1/1/2015', active_vehicles=190, trips=1132),\n",
       " Row(dispatching_base_number=u'B02765', date=u'1/1/2015', active_vehicles=225, trips=1765),\n",
       " Row(dispatching_base_number=u'B02764', date=u'1/1/2015', active_vehicles=3427, trips=29421),\n",
       " Row(dispatching_base_number=u'B02682', date=u'1/1/2015', active_vehicles=945, trips=7679),\n",
       " Row(dispatching_base_number=u'B02617', date=u'1/1/2015', active_vehicles=1228, trips=9537)]"
      ]
     },
     "execution_count": 52,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Register a `TempTable` to make the `Spark SQL` much more like SQL.\n",
    "e.g. we can use the registered temp name behave like SQL Table Name."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[dispatching_base_number: string, date: string, active_vehicles: int, trips: int]"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Register the TempTable name and display the structure / schema.\n",
    "df.registerTempTable(\"uber\")\n",
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## `printSchema()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- dispatching_base_number: string (nullable = true)\n",
      " |-- date: string (nullable = true)\n",
      " |-- active_vehicles: integer (nullable = true)\n",
      " |-- trips: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## SELECT `DISTINCT` `dispatching_base_number`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(dispatching_base_number=u'B02598'),\n",
       " Row(dispatching_base_number=u'B02764'),\n",
       " Row(dispatching_base_number=u'B02765'),\n",
       " Row(dispatching_base_number=u'B02617'),\n",
       " Row(dispatching_base_number=u'B02682'),\n",
       " Row(dispatching_base_number=u'B02512')]"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sqlContext.sql(\"SELECT DISTINCT dispatching_base_number from uber\").collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array(['B02512', 'B02765', 'B02764', 'B02682', 'B02617', 'B02598'], dtype=object)"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Done in Pandas\n",
    "pandf.dispatching_base_number.unique()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Support for advanced SQL functions\n",
    "\n",
    "Spark SQL has support for advanced SQL features and functions. Some are highlighed below:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### What `Bases` are the busiest?\n",
    "> Notice the quote mark on column_names are not approstrophes ( ' ), but accent mark ( ` ) (top left of keyboard!)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------------+---------+\n",
      "|dispatching_base_number|TripCount|\n",
      "+-----------------------+---------+\n",
      "|                 B02764|  1914449|\n",
      "|                 B02617|   725025|\n",
      "|                 B02682|   662509|\n",
      "|                 B02598|   540791|\n",
      "|                 B02765|   193670|\n",
      "|                 B02512|    93786|\n",
      "+-----------------------+---------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sqlContext.sql(\"\"\"SELECT DISTINCT(`dispatching_base_number`),\n",
    "                   SUM(`trips`) AS TripCount FROM uber\n",
    "                   GROUP BY `dispatching_base_number`\n",
    "                   ORDER BY TripCount DESC\n",
    "                \"\"\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### What `Dates` are the busiest?\n",
    "> Notice the quote mark can also sometimes be ommited!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---------+\n",
      "|     date|TripCount|\n",
      "+---------+---------+\n",
      "|2/20/2015|   100915|\n",
      "|2/14/2015|   100345|\n",
      "|2/21/2015|    98380|\n",
      "|2/13/2015|    98024|\n",
      "|1/31/2015|    92257|\n",
      "|2/15/2015|    89401|\n",
      "|2/27/2015|    88806|\n",
      "|2/19/2015|    88757|\n",
      "|2/28/2015|    88181|\n",
      "| 2/6/2015|    85940|\n",
      "+---------+---------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sqlContext.sql(\"\"\"SELECT DISTINCT (date),\n",
    "                   SUM(trips) AS TripCount FROM uber\n",
    "                   GROUP BY date\n",
    "                   ORDER BY TripCount DESC LIMIT 10\n",
    "                \"\"\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
